Multimodal LiDAR-Camera Sensor Fusion Algorithm for Road Longitudinal Slope Estimation¶

I utilized the outputs of pretrained vision models (semantic segmentation and YOLO) and developed a pipeline involving geometric calibration, 3D-to-2D projection, and DBSCAN based clustering to precisely extract road-surface points. Traditional methods suffered from high error rates due to noise from roadside objects—such as guardrails, trees, and vehicles-being mistakenly included during image-LiDAR alignment. To address this, I developed a DBSCAN based road surface extraction algorithm that effectively removes such noise, ultimately reducing longitudinal slope estimation error by approximately 30%.

Data Availability¶

The raw data used in this project is corporate confidential data and cannot be publicly shared. Therefore, this repository contains only the analysis code (ipynb) without the original dataset. For security reasons, all sensitive information and paths have been removed.

Sample Outputs¶

Raw Inputs¶

No description has been provided for this image

Original Camera Image

No description has been provided for this image

Semantic Segmentation Output

No description has been provided for this image

Raw LiDAR Point Cloud

Final Outputs¶

No description has been provided for this image No description has been provided for this image
In [1]:
import open3d as o3d
import pandas as pd
import numpy as np
import os, json, warnings, math
import cv2
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
In [2]:
# Set file paths
pcd_dir = '/Users/jiyeonhwang/Downloads/서울대 교통계획연구실/도도/도도 종단경사/테스트베드/lidar'
mask_dir = '/Users/jiyeonhwang/Downloads/서울대 교통계획연구실/도도/도도 종단경사/테스트베드/sementic_1'

origin = np.array([0.0, 0.0, -2.27], dtype=np.float32) # Set vehicle origin coordinates

road_rgb = np.array([128, 64, 128])  # Road color (RGB threshold)

BASE_POINT   = np.array([0.0, 0.0, -2.27], dtype=np.float32)

Define preprocessing functions¶

In [3]:
# Import functions required for preprocessing

# 1. Define PCD reader (xyz + intensity)
def read_pcd_xyz_intensity(path):
    pcd_t = o3d.t.io.read_point_cloud(path)
    xyz = pcd_t.point['positions'].numpy().astype(np.float32)
    if 'intensity' in pcd_t.point:
        intensity = pcd_t.point['intensity'].numpy().astype(np.float32).flatten()
    else:
        raise ValueError("intensity 필드가 존재하지 않습니다.")
    return xyz, intensity

# 2. Calibration utilities
def quat_to_R(w, x, y, z):
    return np.array([
        [1-2*(y*y+z*z),   2*(x*y - z*w),   2*(x*z + y*w)],
        [2*(x*y + z*w),   1-2*(x*x+z*z),   2*(y*z - x*w)],
        [2*(x*z - y*w),   2*(y*z + x*w),   1-2*(x*x+y*y)]
    ], dtype=np.float32)

def make_tf(ext):
    T = np.eye(4, dtype=np.float32)
    T[:3,:3] = quat_to_R(ext["w"], ext["x"], ext["y"], ext["z"])
    T[:3,3]  = [ext["tx"], ext["ty"], ext["tz"]]
    return T

# 3. Load calibration parameters
calib_path = '/Users/jiyeonhwang/Downloads/서울대 교통계획연구실/도도/도도 종단경사/지연 test 파일/Calib.json'
with open(calib_path, 'r') as f:
    calib = json.load(f)

intr = calib["01_camera"]["3_intrinsic"]
extr = calib["01_camera"]["4_extrinsic"]

K = np.array([
    [intr["fx"], intr.get("skew",0.0), intr["cx"]],
    [0,           intr["fy"],           intr["cy"]],
    [0,           0,                     1     ]
], dtype=np.float32)

dist = np.array([
    intr["k1"], intr["k2"], intr["p1"], intr["p2"], intr.get("k3", 0.0)
], dtype=np.float32)

tf = make_tf(extr)

# 4. Define file paths and ROI

ROI_X = (-20.0, 20.0)
ROI_Y = (-100.0, 0.0)
ROI_Z = (-20.0, 20.0)

road_rgb = np.array([128, 64, 128])


# # trimming 비율
# TRIM_RATIO_X = 0.10
# TRIM_RATIO_Z = 0.05

# # trim 함수
# def trim_extremes(idxs, vals, ratio):
#     N = len(idxs)
#     k = int(math.floor(ratio * N))
#     if N < 2*k+1:
#         return idxs
#     order = np.argsort(vals[idxs])
#     return idxs[order[k:N-k]]
In [4]:
# Create a list of files to process
file_list = sorted([f for f in os.listdir(pcd_dir) if f.endswith('pcd')])

Projection + Alignment Preprocessing Loop¶

In [ ]:
from tqdm import tqdm

data_dict = {} # Save projection and alignment results in data_dict

for file_name in tqdm(file_list):
    try:
        pcd_path = os.path.join(pcd_dir, file_name)
        mask_path = os.path.join(mask_dir, file_name.replace('.pcd', '_mask_rgb.png'))

        # Load PCD (xyz + intensity)
        xyz_all, intensity_all = read_pcd_xyz_intensity(pcd_path)

        # Camera alignment (projection)
        hom_all = np.hstack([xyz_all, np.ones((len(xyz_all), 1), np.float32)])
        cam_all = (tf @ hom_all.T).T[:, :3]
        mask_f = cam_all[:, 2] > 0
        xyz_all, cam_all = xyz_all[mask_f], cam_all[mask_f]
        intensity_all = intensity_all[mask_f]

        # Project to 2D image coordinates
        px, _ = cv2.projectPoints(cam_all.astype(np.float32),
                                  np.zeros((3,1)), np.zeros((3,1)),
                                  K, dist)
        uv = px.reshape(-1, 2).astype(int)

        seg_bgr = cv2.imread(mask_path)
        seg_rgb = cv2.cvtColor(seg_bgr, cv2.COLOR_BGR2RGB)
        H, W = seg_rgb.shape[:2]

        valid = (
            (0 <= uv[:,0]) & (uv[:,0] < W) &
            (0 <= uv[:,1]) & (uv[:,1] < H)
        )
        seg_col = np.zeros((len(uv), 3), np.uint8)
        seg_col[valid] = seg_rgb[uv[valid,1], uv[valid,0]]

        # ROI and road color filtering
        roi_mask = (
            (ROI_X[0] <= xyz_all[:,0]) & (xyz_all[:,0] <= ROI_X[1]) &
            (ROI_Y[0] <= xyz_all[:,1]) & (xyz_all[:,1] <= ROI_Y[1]) &
            (ROI_Z[0] <= xyz_all[:,2]) & (xyz_all[:,2] <= ROI_Z[1])
        )
        sem_road = np.all(np.abs(seg_col.astype(int) - road_rgb) <= 20, axis=1)
        candidate_idx = np.where(roi_mask & sem_road)[0]
        road_pts = xyz_all[candidate_idx]
        intensity_roi = intensity_all[candidate_idx]
    
        # # trimming
        # idx_x  = trim_extremes(np.arange(len(road_pts)), road_pts[:,0], TRIM_RATIO_X)
        # idx_xz = trim_extremes(idx_x, road_pts[:,2], TRIM_RATIO_Z)
        # final_idx = candidate_idx[idx_xz]
        # trimmed_out = np.setdiff1d(candidate_idx, final_idx) # 선택되지 않은 인덱스들 -> trim으로 없어진 영역

        # # trim 된거
        # road_pts = xyz_all[final_idx]
        # intensity_roi = intensity_all[final_idx]

        if len(road_pts) == 0:
            print(f"{file_name} : 도로 점 없음")
            continue

        # Extract the farthest road point
        local_far_idx = np.argmin(road_pts[:,1])
        global_far_idx = candidate_idx[local_far_idx]
        far_pt = road_pts[local_far_idx]

        data_dict[file_name] = { # Store preprocessing results in dictionary
            "xyz_all": xyz_all,
            "intensity_all": intensity_all,
            "road_pts": road_pts,
            "intensity_roi": intensity_roi,
            "final_idx": candidate_idx,
            "global_far_idx": global_far_idx,
            "far_pt": far_pt
        }

    except Exception as e:
        print(f"[에러] {file_name}: {e}")
  0%|          | 0/100 [00:00<?, ?it/s]/var/folders/57/dv1c_0xn3xs1tzzxdvgp0pth0000gn/T/ipykernel_7874/1114291487.py:24: RuntimeWarning: invalid value encountered in cast
  uv = px.reshape(-1, 2).astype(int)
100%|██████████| 100/100 [00:33<00:00,  3.00it/s]

Loop for generating normalized features for DBSCAN¶

In [6]:
# Normalize features only

features_dict = {} # Dictionary to store normalized features


for i in file_list:
    # Combine (N,3) road points with (N,1) intensity -> (N,4)
    features = np.hstack([data_dict[i]['road_pts'], data_dict[i]['intensity_roi'].reshape(-1, 1)])
    
    # Normalize features (x, y, z, intensity have different scales)
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)

    features_dict[i] = features_scaled
In [7]:
# Apply weights to the normalized features

features_w_dict = {} # Dictionary to store weighted features
weights = np.array([0.5, 0.5, 4.0, 1.0])

for i in file_list:
    features_weighted = features_dict[i] * weights
    features_w_dict[i] = features_weighted

DBSCAN 1: Using normalized features¶

In [8]:
# eps: distance threshold, min_samples: minimum number of points

db = DBSCAN(eps=0.3, min_samples=10)
labels = db.fit_predict(features_dict[file_list[3]])

unique_labels = np.unique(labels)
colors = np.zeros((len(labels), 3)) 

colormap = plt.cm.get_cmap('Set3', len(unique_labels)) 
for idx, label in enumerate(unique_labels):
    if label == -1:
        colors[labels == label] = [0.5, 0.5, 0.5]  # noise points -> gray
    else:
        colors[labels == label] = colormap(idx)[:3]


# Visualization
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(data_dict[file_list[3]]['road_pts'])
pcd.colors = o3d.utility.Vector3dVector(colors)

o3d.visualization.draw_geometries([pcd])
/var/folders/57/dv1c_0xn3xs1tzzxdvgp0pth0000gn/T/ipykernel_7874/521401732.py:9: MatplotlibDeprecationWarning: The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.
  colormap = plt.cm.get_cmap('Set3', len(unique_labels))

Visualize DBSCAN clustering results together with the full point cloud¶

In [9]:
target_file = file_list[3]

db = DBSCAN(eps=0.3, min_samples=10)
labels = db.fit_predict(features_dict[target_file])

unique_labels = np.unique(labels)


xyz_all      = data_dict[target_file]['xyz_all']     
candidate_idx = data_dict[target_file]['final_idx']
colors_full = np.ones((len(xyz_all), 3)) * 0.5   


colormap = plt.cm.get_cmap('Set3', len(unique_labels))

for idx, label in enumerate(unique_labels):

    if label == -1:
        continue

    cluster_color = np.array(colormap(idx)[:3])

    local_mask = (labels == label)

    global_idx = candidate_idx[local_mask]

    colors_full[global_idx] = cluster_color

pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz_all)
pcd.colors = o3d.utility.Vector3dVector(colors_full)

o3d.visualization.draw_geometries([pcd])
/var/folders/57/dv1c_0xn3xs1tzzxdvgp0pth0000gn/T/ipykernel_7874/3966926208.py:14: MatplotlibDeprecationWarning: The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.
  colormap = plt.cm.get_cmap('Set3', len(unique_labels))
No description has been provided for this image

DBSCAN 2: Using normalized and weighted features¶

In [10]:
test_file = file_list[3]

db_w = DBSCAN(eps=0.2, min_samples=10)
labels_w = db_w.fit_predict(features_w_dict[test_file])

unique_labels_w = np.unique(labels_w)
colors_w = np.zeros((len(labels_w), 3)) 

colormap_w = plt.cm.get_cmap('Set3', len(unique_labels_w))  
for idx_w, label_w in enumerate(unique_labels_w):
    if label_w == -1:
        colors_w[labels_w == label_w] = [0.5, 0.5, 0.5]
    else:
        colors_w[labels_w == label_w] = colormap_w(idx_w)[:3]

pcd_w = o3d.geometry.PointCloud()
pcd_w.points = o3d.utility.Vector3dVector(data_dict[test_file]['road_pts'])
pcd_w.colors = o3d.utility.Vector3dVector(colors_w)

o3d.visualization.draw_geometries([pcd_w])
/var/folders/57/dv1c_0xn3xs1tzzxdvgp0pth0000gn/T/ipykernel_7874/505877789.py:9: MatplotlibDeprecationWarning: The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.
  colormap_w = plt.cm.get_cmap('Set3', len(unique_labels_w))

Compute longitudinal slope after DBSCAN-based noise removal¶

In [11]:
from collections import Counter

# Select the largest cluster (excluding -1 which represents noise)
label_counter = Counter(labels_w)
label_counter.pop(-1, None)  # remove noise label
road_label = label_counter.most_common(1)[0][0]  # label with the most points

# Extract only the main road cluster
main_road = labels_w == road_label
main_road_pts = data_dict[test_file]['road_pts'][main_road]
In [12]:
test_far_idx = np.argmin(data_dict[test_file]['road_pts'][:,1])
test_far_idx
# global_far_idx = candidate_idx[local_far_idx]
# far_pt = road_pts[local_far_idx]
Out[12]:
np.int64(4989)
In [13]:
data_dict[test_file]['road_pts'][:,1]
Out[13]:
array([-11.738706 , -11.199067 , -12.355909 , ...,  -6.6102333,
        -7.166094 ,  -6.880293 ], dtype=float32)
In [14]:
test_far_pts = data_dict[test_file]['road_pts'][test_far_idx]
test_far_pts
Out[14]:
array([  2.9195569, -99.54187  ,  -2.7990465], dtype=float32)
In [15]:
test_far_pts[:2]
Out[15]:
array([  2.9195569, -99.54187  ], dtype=float32)
In [16]:
BASE_POINT[2]
Out[16]:
np.float32(-2.27)
In [ ]:
horiz = np.linalg.norm(test_far_pts[:2] - BASE_POINT[:2])
vert  = test_far_pts[2] - BASE_POINT[2]
slope_ratio = (vert / horiz)*100 if horiz != 0 else 0.0
dist      = np.linalg.norm(far_pt - BASE_POINT)

print(horiz, vert, slope_ratio, dist)
99.58467 -0.52904654 -0.531253 100.59322
The Kernel crashed while executing code in the current cell or a previous cell. 

Please review the code in the cell(s) to identify a possible cause of the failure. 

Click <a href='https://aka.ms/vscodeJupyterKernelCrash'>here</a> for more info. 

View Jupyter <a href='command:jupyter.viewOutput'>log</a> for further details.
In [19]:
from collections import Counter
import pandas as pd

BASE_POINT = np.array([0.0, 0.0, -2.27], dtype=np.float32)

results = []

for fname in file_list:
    try:
        road_pts = data_dict[fname]['road_pts']
        intensity = data_dict[fname]['intensity_roi']

        if len(road_pts) < 10:
            continue  # Skip files with too few road points

        # Normalize + apply feature weights
        features = np.hstack([road_pts, intensity.reshape(-1,1)])
        features = StandardScaler().fit_transform(features)
        weights = np.array([0.5, 0.5, 4.0, 1.0])
        features *= weights

        # DBSCAN clustering
        labels = DBSCAN(eps=0.2, min_samples=10).fit_predict(features)

        # Identify the largest cluster (excluding noise)
        label_counter = Counter(labels)
        label_counter.pop(-1, None)  # remove noise (-1)
        if not label_counter:
            continue
        road_label = label_counter.most_common(1)[0][0]
        main_mask = labels == road_label
        main_road_pts = road_pts[main_mask]

        # Extract the farthest point along the longitudinal axis (min y)
        far_idx = np.argmin(main_road_pts[:,1])
        far_pt = main_road_pts[far_idx]

        # Compute horizontal distance, vertical rise, slope (%), and 3D distance
        horiz = np.linalg.norm(far_pt[:2] - BASE_POINT[:2])
        vert = far_pt[2] - BASE_POINT[2]
        slope = (vert / horiz) * 100 if horiz != 0 else 0.0
        dist = np.linalg.norm(far_pt - BASE_POINT)

        results.append({
            'file': fname,
            'slope_percent(%)': slope,
            'distance_m': dist
        })

    except Exception as e:
        print(f"[에러] {fname}: {e}")

df_result = pd.DataFrame(results)
df_result
Out[19]:
file slope_percent(%) distance_m
0 364800.099993.pcd -0.122479 71.837372
1 364802.100011.pcd -0.318304 66.746185
2 364803.700000.pcd -0.012193 69.414627
3 364804.999997.pcd -0.307521 59.249557
4 364806.099978.pcd -0.498715 58.696812
... ... ... ...
95 367390.900000.pcd -0.301984 55.855045
96 367391.600000.pcd -0.213143 54.660049
97 367392.299998.pcd -0.886404 58.215660
98 367393.000014.pcd -0.601244 60.295650
99 367393.699993.pcd -0.675602 61.510811

100 rows × 3 columns